线程同步

作者:[美]易格恩.阿格佛温(Eugene Agafonov)
译者:黄博文 黄辉兰
改编:陈广
日期:2018-3-15


简介

如上篇文章所看到的,多个线程同时使用共享对象会造成很多问题。同步这些线程使得对共享对象的操作能够以正确的顺序执行是非常重要的。在篇文章我们遇到了一个叫作竞争条件的问题。导致这问题的原因是多线程的执行并没有正确同步。当一个线程执行递增递减操作时,其他线程需要依次等待。这种常见问题通常被称为线程同步

有多种方式来实现线程同步。首先,如果无须共享对象,那么就无须进行线程同步。令人惊奇的是大多数时候可以通过重新设计程序来移除共享状态,从而去掉复杂的同步构造。请尽可能避免在多个线程间使用单一对象。

如果必须使用共享状态,第二种方式是只使用原子操作。这意味着一个操作只占用一个量子的时间,一次就可以完成。所以只有当前操作完成后,其他线程才能执行其他操作。因此,你无须实现其他线程等待当前操作完成,这就避免了使用锁,也排除了死锁的情况。

如果上面的方式不可行,并且程序的逻辑更加复杂,那么我们不得不使用不同的方式来协调线程。方式之一是将等待的线程置于阻塞状态。当线程处于阻塞状态时,只会占用尽可能少的CPU时间。然而,这意味着将引入至少一次所谓的上下文切换(context switch)。上下文切换是指操作系统的线程调试器。该调度器会保存等待的线程的状态,并切换到另一个线程,依次恢复等待的线程的状态。这需要消耗相当多的资源。然而,如果线程要被挂起很长时间,那么这样做是值得的。这种方式又被称为内核模式(kernel-mode),因为只有操作系统的内核才能阻止线程使用CPU的时间。

万一线程只需要等待一小段时间,最好只是简单的等待,而不用将线程切换到阻塞状态。虽然线程等待时会浪费CPU时间,但我们节省了上下文切换耗费的CPU时间。该方式又被称为用户模式(user-mode)。该方式非常轻量,速度很快,但如果线程需要等待较长时间则会浪费大量的CPU时间。

为了利用好这两种方式,可以使用混合模式(hybrid)。混合模式先尝试使用用户模式等待,如果线程等待了足够长的时间,则会切换到阻塞状态以节省CPU资源。

在本间中我们将介绍线程同步这一知识点。我们将讲解如何执行原子操作,以及如何使用.NET Core中现有的同步方式。

执行基本的原子操作

新建一个Sync文件夹,用vscode打开并新建一个console项目,输入如下代码:

using System;
using System.Threading;

namespace Sync
{
    class Program
    {
        class Counter
        {
            private int _count;
            public int Count { get { return _count; } }
            public void Increment()
            {
                Interlocked.Increment(ref _count);
            }
            public void Decrement()
            {
                Interlocked.Decrement(ref _count);
            }
        }
        static void Main(string[] args)
        {
            var c = new Counter();
            var t1 = new Thread(() => TestCounter(c));
            var t2 = new Thread(() => TestCounter(c));
            var t3 = new Thread(() => TestCounter(c));
            t1.Start();
            t2.Start();
            t3.Start();
            t1.Join();
            t2.Join();
            t3.Join();

            Console.WriteLine("Total count: {0}", c.Count);
        }
        static void TestCounter(Counter c)
        {
            for (int i = 0; i < 100000; i++)
            {
                c.Increment();
                c.Decrement();
            }
        }
    }
}

运行结果:0

在上一篇文章中,我们通过锁定对象解决了这个问题。在一个线程获取旧的计数器值并计算后赋予新的值之前,其他线程都被阻塞了。然而,如果我们采用上述方式执行该操作,中途不能停止。而借助于Interlocked类,我们无需锁定任何对象即可获取到正确的结果。Interlocked提供了IncrementDecrementAdd等基本数学操作的原子方法,从而帮助我们在编写Counter类时无需使用锁。

使用Mutex类

Mutex,在单片机操作系统里经常会提到这个词,中文名互斥量,说明它是属于操作系统级别的机制。互斥量是一种原始的同步方式,一般用于进程间的同步。我们可以用它来实现同一程序,同一时间只能运行一个副本。输入如下代码:

static void Main(string[] args)
{
    const string MutexName="iotxfd";
    using (var m=new Mutex(false,MutexName))
    {
        if(!m.WaitOne(TimeSpan.FromSeconds(10),false))
        {
            Console.WriteLine("第二个实例正在运行中!");
        }
        else
        {
            Console.WriteLine("运行中......");
            Console.ReadLine();
            m.ReleaseMutex();
        }
    }
}

这个程序需要使用.exe文件来测试,如果使用Visual Studio来写代码,还比较方便,但如果用的是Visual Studio Code则有些麻烦。下面跟我一起做。

  1. 打开Sync.csproj文件(在根目录下,文件名根据你所建项目名称,后缀名是.csproj就行了)。在<PropertyGroup>节点下添加如下项:
    <RuntimeIdentifiers>win10-x64</RuntimeIdentifiers>
    注意,你的操作系统是win10才能这样写。

最后整个Sync.csproj文件的代码大概是以下的样子:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp2.0</TargetFramework>
    <RuntimeIdentifiers>win10-x64</RuntimeIdentifiers>
  </PropertyGroup>

</Project>

你在读这篇文章时如果.NET Core已经升级到其它版本就有可能会不一样。

  1. 打开终端,输入如下命令:
dotnet publish -c release -r win10-x64

此命令会发布应用程序并生成exe文件。

  1. 打开\bin\release\netcoreapp2.0\win10-x64目录,找到Sync.exe文件。

最终,我们在vscode得到了exe文件。

运行程序:

  1. 在Windows资源管理器中双击Sync.exe文件,打开第一个控制台窗口,这时显示运行中......
  2. 再次双击Sync.exe文件,打开第二个控制台窗口,但未显示任何东西,说明它在等待中。
  3. 在10秒内如果在第一个控制台窗口中使用回车结束程序,则第二个控制台窗口显示运行中......
  4. 如果一直不停止第一个程序的运行,则第二个程序在10秒后会自动关闭。

**注意:**具名的互斥量是全局的操作系统对象!请务必正确关闭互斥量。最好是使用using代码块来包裹互斥量对象。

Mutex跟后面的很多同步类使用起来非常相似,首先使用WaitOne来获取许可,如果此时别的线程或进程已经调用了WaitOne且没有释放,那么线程或进程将进入阻塞状态。当别的线程或进程调用ReleaseMutex后释放了Mutex资源,那么此线程或进程就获取许可从而可以继续执行程序了。

说白了,Mutex和lock类似,WaitOne就是上锁,ReleaseMutex就是解锁。但是需要注意的是Mutex的运行速度和其它同步类相比会慢很多,以前我使用的时候有这样的感受。所以如果有替代品,尽量就不要用它了。

使用SemaphoreSlim类

Semaphore也是单片机操作系统里经常提到的词,中文名信号量,可用于在线程间传递信号。SemaphoreSlim意为信号量的轻量级版本。先执行以下程序:

static void Main(string[] args)
{
    for (int i = 1; i <= 6; i++)
    {
        string threadName = "Thread " + i;
        int secondsToWait = 2 * i;
        var t = new Thread(() => AccessDatabase(threadName, secondsToWait));
        t.Start();
    }
}
static SemaphoreSlim _semaphore = new SemaphoreSlim(4);
static void AccessDatabase(string name, int seconds)
{
    Console.WriteLine("{0} 等待访问资源", name);
    _semaphore.Wait();
    Console.WriteLine("{0} 获准访问资源", name);
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("{0} 结束", name);
    _semaphore.Release();
}

运行结果:

先来看看这句代码:

static SemaphoreSlim _semaphore = new SemaphoreSlim(4);

这里创建了一个SemaphoreSlim,参数4表示同时允许4条线程访问资源。

从运行结果可知,刚开始线程1、3、6、4获准访问资源,2、5等待,1线束后2顶上,3结束后5顶上,直到全部线程访问结束。

SemaphoreSlim的Wait方法用于获取许可,未获许可则阻塞线程,Release则用来释放许可。换个方式说就是:申请使用资源的线程Wait(等待)正在使用资源的线程Release(释放)信号。它的使用方式和Mutex类似,区别在于SemaphoreSlim可允许多条线程访问资源并控制同时访问的数量。

提示:
这里我们使用了混合模式,其允许我们在等待时间很短的情况下无需使用上下文切换。然而,有一个叫作Semaphore的SemaphoreSlim类的老版本。该版本使用纯粹的内核时间(kernel-time)方式。一般没必要使用它,除非是非常重要的场景。我们可以创建一个具名的semaphore,就像一个具名的mutex一样,从而在不同的程序中同步线程。SemaphoreSlime并不使用Windows内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用Semaphore。

使用AutoResetEvent类

先运行以下实例:

private static AutoResetEvent _workerEvent = new AutoResetEvent(false);
private static AutoResetEvent _mainEvent = new AutoResetEvent(false);
static void Process(int seconds)
{
    Console.WriteLine("thread:开始长时间工作...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));//工作
    Console.WriteLine("thread:工作已完成!");
    _workerEvent.Set();//释放
    Console.WriteLine("threard:等待主线程完成它的工作");
    _mainEvent.WaitOne();//等待
    Console.WriteLine("thread:开始第二个操作...");
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("thread:工作已完成!");
    _workerEvent.Set();//释放
}
static void Main(string[] args)
{
    var t = new Thread(() => Process(10));
    t.Start();
    Console.WriteLine("main:等待另一个线程完成工作");
    _workerEvent.WaitOne();//等待
    Console.WriteLine("main:线程的第一个操作已经完成!");
    Console.WriteLine("main:在主线程执行一个操作");
    Thread.Sleep(TimeSpan.FromSeconds(5));//工作
    _mainEvent.Set();//释放
    Console.WriteLine("main:现在在第二个线程中执行第二个操作");
    _workerEvent.WaitOne();//等待
    Console.WriteLine("main:第二个操作已经完成");
}

运行结果:

运行过程我画了张图,图的左边是主线程,右边是Process线程。运行过程跟着箭头走,应该能看懂。

当主程序启动时,定义了两个AutoResetEvent实例。其中_workerEvent是从子线程向主线程发信号,_mainEvent是从主线程向子线程发信号。我们向AutoResetEvent构造方法传入false,定义了两个实例的初始状态为unsignatled。这意味着任何线程调用这两个对象中的任何一个的WaitOne将会被阻塞,直到我们调用了Set方法。如果初始事件状态为true,那么AutoResetEvent实例状态为signaled,如果线程调用WaitOne方法则会被立即处理。然后事件状态自动变为unsignaled,所以需要再对该实例调用一次Set方法,以便让其他的线程对该实例调用WaitOne方法从而继续执行。

然后我们创建了第二个线程,其会执行第一个操作10秒钟,然后等待从第二个线程发出信号。该信号意味着第一个操作已经完成。现在第二个线程在等待主线程的信号。我们对主线程做了一些附加工作,并通过调用_mainEvent.Set方法发送了一个信号。然后等待从第二个线程发出的另一个信号。

AutoResetEvent类采用的是内核时间模式,所以等待时间不能太长。使用ManualResetEventSlim类更好,因为它使用的是混合模式。

使用ManualResetEventSlim类

AutoResetEvent事件一次只允许一个线程执行,而ManualResetEventSlim事件一次允许多个线程执行。

运行以下实例:

static ManualResetEventSlim _mainEvent = new ManualResetEventSlim(false);
static void TravelThroughGates(string threadName, int seconds)
{
    Console.WriteLine("{0} 睡觉去了", threadName);
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine("{0} 等着开门!", threadName);
    _mainEvent.Wait();
    Console.WriteLine("{0} 冲进大门!", threadName);
}

static void Main(string[] args)
{
    var t1 = new Thread(() => TravelThroughGates("Thread 1", 5));
    var t2 = new Thread(() => TravelThroughGates("Thread 2", 6));
    var t3 = new Thread(() => TravelThroughGates("Thread 3", 12));
    t1.Start();
    t2.Start();
    t3.Start();
    Thread.Sleep(TimeSpan.FromSeconds(6));
    Console.WriteLine("main:芝麻开门!");
    _mainEvent.Set();
    Thread.Sleep(TimeSpan.FromSeconds(2));
    _mainEvent.Reset();
    Console.WriteLine("main:关门!");
    Thread.Sleep(TimeSpan.FromSeconds(10));
    Console.WriteLine("main:开门几秒!");
    _mainEvent.Set();
    Thread.Sleep(TimeSpan.FromSeconds(2));
    Console.WriteLine("main:关门!");
    _mainEvent.Reset();
}

运行结果:

我画了张图,图解了程序的运行过程:

图中黄色区域表示开门时间。

ManualResetEventSlim的整个工作方式有点像人群通过大门。而AutoResetEvent事件像一个旋转门,一次只允许一人通过。ManualResetEventSlim的Set方法相当于打开大门,从而允许所有准备好的线程接收信号并继续工作。而_mainEvent.Reset相当于关闭了大门。

使用CountDownEvent类

运行以下代码:

static CountdownEvent _countdown = new CountdownEvent(2);
static void PerformOperation(string message, int seconds)
{
    Thread.Sleep(TimeSpan.FromSeconds(seconds));
    Console.WriteLine(message);
    _countdown.Signal();
}
static void Main(string[] args)
{
    Console.WriteLine("开始两个operations");
    var t1 = new Thread(() => PerformOperation("Operation 1 已完成", 4));
    var t2 = new Thread(() => PerformOperation("Operation 2 已完成", 8));
    t1.Start();
    t2.Start();
    _countdown.Wait();
    Console.WriteLine("两个operations都已经完成");
    _countdown.Dispose();
}

运行结果:

当主程序启动时,通过以下语句创建了一个CountdownEvent实例:

static CountdownEvent _countdown = new CountdownEvent(2);

其构造函数中的2指定了当两个操作完成时会发出信号。然后我们启动了两个线程,当它们执行完成后会使用_countdown.Signal();发出信号。一旦第二个线程完成,主线程会从等待CountDownEvent的状态中返回并继续执行。针对需要等待多个异步操作完成的情形,使用该方式是非常便利的。

然而这有一个重大的缺点。如果调用_countdown.Signal()没达到指定的次数,那么_countdown.Wait()将一直等待。请确保使用CountDownEvent时,所有线程完成后都要调用Signal方法。

使用Barrier类

Barrier类用于组织多个线程及时在某个时刻碰面。其提供了一个回调函数,每次线程调用了SignalAndWait方法后该回调函数会被执行。
运行以下代码:

static Barrier _barrier = new Barrier(2, b => Console.WriteLine("第 {0} 场结束",
            b.CurrentPhaseNumber + 1));
static void PlayMusic(string name, string message, int seconds)
{
    for (int i = 1; i < 3; i++)
    {
        Console.WriteLine("--------------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine("{0} 开始 {1}", name, message);
        Thread.Sleep(TimeSpan.FromSeconds(seconds));
        Console.WriteLine("{0} 结束 {1}", name, message);
        _barrier.SignalAndWait();
    }
}
static void Main(string[] args)
{
    var t1 = new Thread(() => PlayMusic("吉它手", "独奏", 5));
    var t2 = new Thread(() => PlayMusic("歌手", "歌唱", 2));
    t1.Start();
    t2.Start();
}

运行结果:

这里需要分析下Barrier类的构造函数,原型为:

public Barrier(int participantCount, Action<Barrier> postPhaseAction);

要理解这个程序还有点费劲,先改下代码,屏敝Main函数中的两句代码,如下所示:

static void Main(string[] args)
{
    var t1 = new Thread(() => PlayMusic("吉它手", "独奏", 5));
    // var t2 = new Thread(() => PlayMusic("歌手", "歌唱", 2));
    t1.Start();
    // t2.Start();
}

看看运行结果:

--------------------------------------
吉它手 开始 独奏
吉它手 结束 独奏

程序显示完这些内容就停在那了,然后就没有然后了。显然单个线程发两条_barrier.SignalAndWait()指令并不会结束Barrier的等待状态。而必须是两条线程各自发了一条_barrier.SignalAndWait()才能结束等待状态并执行回调函数。

Barrier在线程程迭代运算中非常朋用,可以在每个迭代结束前执行一些计算。当最后一个线程调用SignalAndWait方法时可以在迭代结束时进行交互。

使用ReaderWriterLockSlim类

ReaderWriterLockSlim代表了一个管理资源访问的锁,允许多个线程同时读取,以及独占写。
运行如下代码:

using System;
using System.Threading;
using System.Collections.Generic;

namespace Sync
{
    class Program
    {
        static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
        static Dictionary<int, int> _items = new Dictionary<int, int>();
        static void Read(string threadName)
        {
            while (true)
            {
                try
                {
                    _rw.EnterReadLock();
                    foreach (var key in _items.Keys)
                    {
                        Console.WriteLine("{0}读取键值{1}", threadName, key);
                        Thread.Sleep(500);
                    }
                    Console.WriteLine("{0}完成一次读取", threadName);
                }
                finally
                {
                    _rw.ExitReadLock();
                }
            }
        }
        static void Write(string threadName)
        {
            while (true)
            {
                try
                {
                    int newKey = new Random().Next(250);
                    //稍后要进行一个读取操作,以判断新键是否已存在,使用可升级读锁
                    _rw.EnterUpgradeableReadLock();
                    if (!_items.ContainsKey(newKey))
                    {
                        try
                        {   //确定要进行写入操作后再使用写锁
                            _rw.EnterWriteLock();
                            _items[newKey] = 1;
                            Console.WriteLine("{0}:向Dictionary加入新键{1}", threadName, newKey);
                        }
                        finally
                        {
                            _rw.ExitWriteLock();
                        }
                    }
                    Thread.Sleep(1000);
                }
                finally
                {
                    _rw.ExitUpgradeableReadLock();
                }
            }
        }
        static void Main(string[] args)
        {
            new Thread(() => Read("Read Thread 1")) { IsBackground = true }.Start();
            new Thread(() => Read("Read Thread 2")) { IsBackground = true }.Start();
            new Thread(() => Read("Read Thread 3")) { IsBackground = true }.Start();

            new Thread(() => Write("Write Thread 1")) { IsBackground = true }.Start();
            new Thread(() => Write("Write Thread 2")) { IsBackground = true }.Start();
            Thread.Sleep(TimeSpan.FromSeconds(3));
        }
    }
}

运行结果:

Read Thread 3完成一次读取
Read Thread 1完成一次读取
Read Thread 2完成一次读取
Write Thread 1:向Dictionary加入新键190
Read Thread 2读取键值190
Read Thread 1读取键值190
Read Thread 3读取键值190
Read Thread 3完成一次读取
Read Thread 3读取键值190
Read Thread 1完成一次读取
Read Thread 1读取键值190
Read Thread 2完成一次读取
Read Thread 2读取键值190
Read Thread 2完成一次读取
Read Thread 1完成一次读取
Read Thread 3完成一次读取
Write Thread 1:向Dictionary加入新键82
Read Thread 1读取键值190
Read Thread 3读取键值190
Read Thread 2读取键值190
Read Thread 2读取键值82
Read Thread 3读取键值82
Read Thread 1读取键值82
Read Thread 1完成一次读取
Read Thread 2完成一次读取
Read Thread 3完成一次读取
Write Thread 1:向Dictionary加入新键142
Read Thread 3读取键值190
Read Thread 1读取键值190
Read Thread 2读取键值190
Read Thread 2读取键值82
Read Thread 3读取键值82
Read Thread 1读取键值82

当主程序启动时,同时运行了三个线程来从字典中读取数据,还有另外两个线程向该字典中写入数据。我们使用ReaderWriterLockSlim类来实现线程安全,该类专为这样的场景而设计。

在本例中,我们先生成一个随机数。然后获取读锁并检查该数是否存在于字典的键集合中。如果不存在,将读锁更新为写锁,然后将该新键加入到字典中。始终使用try/finally代码块来确保在捕获锁后一定会释放锁,这是一项好的实践。所有线程都被创建为后台线程,主线程在所有后台线程完成后会等待3秒。

先看看使用了哪些锁:

这里使用了两种锁:读锁允许多线程读取数据而不会阻塞其他线程,写锁在被释放前会阻塞其他线程的所有操作。示例代码中,在写操作前会去判断写入的键是否已经存在,这是一个读操作,如果此时就使用写锁,将会阻塞所有线程,从而浪费大量的时间,此时使用可升级为写的读锁EnterUpgradeableReadLock进行读取,如果发现新键不存在,将要写入,再使用EnterWriteLock升级锁,然后快速执行一次写操作。

总结

讲了这么多同步类,下面做一个总结: